package com.bw;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.common.base.BaseApp;
import com.bw.gmall.realtime.common.bean.TradeSkuOrderBean;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.DateFormatUtil;
import com.bw.gmall.realtime.common.util.HbaseUtil;
import com.bw.gmall.realtime.common.util.RedisUtil;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.AsyncConnection;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class DwsTradeSkuOrderAsyncCacheWindow extends BaseApp {
    public static void main(String[] args) {
        new DwsTradeSkuOrderAsyncCacheWindow().start(Constant.TOPIC_DWD_TRADE_ORDER_DETAIL,Constant.TOPIC_DWD_TRADE_ORDER_DETAIL,1,10029);
    }
    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> dataStreamSource) {
        // 1.ETL清洗数据
        SingleOutputStreamOperator<JSONObject> etlStream = dataStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                try {
                    // 因为有撤回流,所有要过滤
                    if (s != null) {
                        JSONObject jsonObject = JSON.parseObject(s);
                        String skuId = jsonObject.getString("sku_id");
                        Long ts = jsonObject.getLong("ts");
                        if (skuId != null && ts != null) {
                            jsonObject.put("ts", ts * 1000);
                            collector.collect(jsonObject);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        // 2.添加水位线
        SingleOutputStreamOperator<JSONObject> wmStream = etlStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
            @Override
            public long extractTimestamp(JSONObject jsonObject, long l) {
                return jsonObject.getLong("ts");
            }
        }));
        // 3.分组
        KeyedStream<JSONObject, String> keyedStream = wmStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject jsonObject) throws Exception {
                return jsonObject.getString("id");
            }
        });
        // 4.防止下游重复计算
        SingleOutputStreamOperator<TradeSkuOrderBean> processStream = keyedStream.process(new ProcessFunction<JSONObject, TradeSkuOrderBean>() {

            private MapState<String, BigDecimal> mapState;

            /**                                                 下游
             * {"id":10,“split_total_amount”：10}---> 10        10-0    10
             * {"id":10,“split_total_amount”：15}---> 15        15-10   5
             *
             *      优点：时效性高
             *
             *
             * 定时器： <id,[list]>---><1,[10,15]>
             *      从状态里面取，如果状态里面没有值，代表第一条来了，开启10秒定时器---->
             *      当10秒过了，取出list最后一条向下游发，清楚状态
             *      缺点：时效性不高，不管怎么样，都需要等10秒
             *
             * @param parameters
             * @throws Exception
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, BigDecimal> stateDescriptor = new MapStateDescriptor<String, BigDecimal>("order_state", String.class, BigDecimal.class);
                stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(30L)).build());
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(JSONObject jsonObject, ProcessFunction<JSONObject, TradeSkuOrderBean>.Context context, Collector<TradeSkuOrderBean> collector) throws Exception {
                // 从状态里面取值
                BigDecimal split_total_amount = mapState.get("splitTotalAmount");// 分摊金额
                BigDecimal split_activity_amount = mapState.get("splitActivityAmount");// 活动金额
                BigDecimal split_coupon_amount = mapState.get("splitCouponAmount");// 优惠券金额
                BigDecimal original_amount = mapState.get("originalAmount");// 原始金额

                split_total_amount = split_total_amount == null ? new BigDecimal(0) : split_total_amount;
                split_activity_amount = split_activity_amount == null ? new BigDecimal(0) : split_activity_amount;
                split_coupon_amount = split_coupon_amount == null ? new BigDecimal(0) : split_coupon_amount;
                original_amount = original_amount == null ? new BigDecimal(0) : original_amount;

                // 取出数据里面的度量值

                BigDecimal splitTotalAmount = jsonObject.getBigDecimal("split_total_amount");
                BigDecimal splitActivityAmount = jsonObject.getBigDecimal("split_activity_amount");
                BigDecimal splitCouponAmount = jsonObject.getBigDecimal("split_coupon_amount");

                BigDecimal sku_num = jsonObject.getBigDecimal("sku_num");
                BigDecimal orderPrice = jsonObject.getBigDecimal("order_price");

                // 计算原始金额
                BigDecimal originalAmount = orderPrice.multiply(sku_num);

                // 直接往下游发
                String skuId = jsonObject.getString("sku_id");
                String skuName = jsonObject.getString("sku_name");
                String id = jsonObject.getString("id");
                Long ts = jsonObject.getLong("ts");

                // 往下游发的数据
                collector.collect(TradeSkuOrderBean.builder()
                        .orderDetailId(id)
                        .skuName(skuName)
                        .ts(ts)
                        .skuId(skuId)
                        .originalAmount(originalAmount.subtract(original_amount))
                        .activityReduceAmount(splitActivityAmount.subtract(split_activity_amount))
                        .couponReduceAmount(splitCouponAmount.subtract(split_coupon_amount))
                        .orderAmount(splitTotalAmount.subtract(split_total_amount))
                        .build());

                // 更新到状态里面
                // 更新状态
                mapState.put("splitTotalAmount", splitTotalAmount);
                mapState.put("splitActivityAmount", splitActivityAmount);
                mapState.put("splitCouponAmount", splitCouponAmount);
                mapState.put("originalAmount", originalAmount);

            }
        });


        // 5.以sku_id分组
        KeyedStream<TradeSkuOrderBean, String> KeyedStream = processStream.keyBy(new KeySelector<TradeSkuOrderBean, String>() {
            @Override
            public String getKey(TradeSkuOrderBean tradeSkuOrderBean) throws Exception {
                return tradeSkuOrderBean.getSkuId();
            }
        });
        // 6.聚合数据
        SingleOutputStreamOperator<TradeSkuOrderBean> reduceStream = KeyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10L)))
                .reduce(new ReduceFunction<TradeSkuOrderBean>() {
                    @Override
                    public TradeSkuOrderBean reduce(TradeSkuOrderBean value1, TradeSkuOrderBean value2) throws Exception {
                        value1.setOriginalAmount(value1.getOriginalAmount().add(value2.getOriginalAmount()));
                        value1.setCouponReduceAmount(value1.getCouponReduceAmount().add(value2.getCouponReduceAmount()));
                        value1.setActivityReduceAmount(value1.getActivityReduceAmount().add(value2.getActivityReduceAmount()));
                        value1.setOrderAmount(value1.getOrderAmount().add(value2.getOrderAmount()));
                        return value1;
                    }
                }, new ProcessWindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<TradeSkuOrderBean, TradeSkuOrderBean, String, TimeWindow>.Context context, Iterable<TradeSkuOrderBean> iterable, Collector<TradeSkuOrderBean> collector) throws Exception {
                        // 为了取时间窗口
                        TimeWindow window = context.window();
                        long start = window.getStart();
                        long end = window.getEnd();
                        for (TradeSkuOrderBean tradeSkuOrderBean : iterable) {
                            tradeSkuOrderBean.setStt(DateFormatUtil.tsToDateTime(start));
                            tradeSkuOrderBean.setEdt(DateFormatUtil.tsToDateTime(end));
                            tradeSkuOrderBean.setCurDate(DateFormatUtil.tsToDate(System.currentTimeMillis()));
                            collector.collect(tradeSkuOrderBean);
                        }
                    }
                });

        // 7.异步关联维度
//        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream = AsyncDataStream.unorderedWait(reduceStream, new RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>() {
//            private AsyncConnection hBaseAsyncConnection;
//            @Override
//            public void open(Configuration parameters) throws Exception {
//                hBaseAsyncConnection = HbaseUtil.getHBaseAsyncConnection();
//            }
//            @Override
//            public void asyncInvoke(TradeSkuOrderBean tradeSkuOrderBean, ResultFuture<TradeSkuOrderBean> resultFuture) throws Exception {
//                String table = "dim_sku_info";
//                String skuId = tradeSkuOrderBean.getSkuId();
//                CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
//                    @Override
//                    public JSONObject get() {
//                        JSONObject dim_sku_info = null;
//                        try {
//                            dim_sku_info = HbaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, "dim_sku_info", skuId);
//                        } catch (IOException e) {
//                            e.printStackTrace();
//                        }
//                        return dim_sku_info;
//                    }
//                }).thenAccept(new Consumer<JSONObject>() {
//                    @Override
//                    public void accept(JSONObject dimSkuInfo) {
//                        if (dimSkuInfo != null) {
//                            // 关联SKU
//                            tradeSkuOrderBean.setSpuId(dimSkuInfo.getString("spu_id"));
//                            tradeSkuOrderBean.setTrademarkId(dimSkuInfo.getString("tm_id"));
//                            tradeSkuOrderBean.setCategory3Id(dimSkuInfo.getString("category3_id"));
//                        } else {
//                            System.out.println("没有查到维度数据:" + table + ":" + skuId);
//                        }
//                        resultFuture.complete(Collections.singletonList(tradeSkuOrderBean));
//                    }
//                });
//            }
//        }, 100, TimeUnit.SECONDS);


        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream = AsyncDataStream.unorderedWait(reduceStream, new RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>() {
            private StatefulRedisConnection<String, String> redisAsyncConnection;
            private AsyncConnection hBaseAsyncConnection;

            @Override
            public void open(Configuration parameters) throws Exception {
                hBaseAsyncConnection = HbaseUtil.getHBaseAsyncConnection();
                redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
            }

            @Override
            public void close() throws Exception {
                HbaseUtil.closeAsyncConnection(hBaseAsyncConnection);
                RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
            }

            @Override
            public void asyncInvoke(TradeSkuOrderBean tradeSkuOrderBean, ResultFuture<TradeSkuOrderBean> resultFuture) throws Exception {
                String table = "dim_sku_info";
                String skuId = tradeSkuOrderBean.getSkuId();
                String redisKey = RedisUtil.getRedisKey(table, skuId);
                //先查redis
                CompletableFuture.supplyAsync(new Supplier<String>() {
                    @Override
                    public String get() {
                        String dimInfo = null;
                        RedisFuture<String> stringRedisFuture = redisAsyncConnection.async().get(redisKey);
                        try {
                            dimInfo = stringRedisFuture.get();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                        return dimInfo;
                    }
                }).thenApplyAsync(new Function<String, JSONObject>() {
                    @Override
                    public JSONObject apply(String dimInfo) {
                        JSONObject jsonObject = null;
                        // redis没有该值
                        if (dimInfo == null || dimInfo.length() == 0) {
                            try {
                                jsonObject = HbaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, table, skuId);
                                // 放到redis,进行缓存
                                redisAsyncConnection.async().setex(redisKey, 24 * 3600, jsonObject.toJSONString());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }else{
                            // Redis有数据，直接转成Json数据
                            jsonObject = JSON.parseObject(dimInfo);
                        }
                        return jsonObject;
                    }
                }).thenAccept(new Consumer<JSONObject>() {
                    @Override
                    public void accept(JSONObject dimSkuInfo) {
                        if (dimSkuInfo != null) {
                            // 关联SKU
                            tradeSkuOrderBean.setSpuId(dimSkuInfo.getString("spu_id"));
                            tradeSkuOrderBean.setTrademarkId(dimSkuInfo.getString("tm_id"));
                            tradeSkuOrderBean.setCategory3Id(dimSkuInfo.getString("category3_id"));
                        } else {
                            System.out.println("没有查到维度数据:" + table + ":" + skuId);
                        }
                        // 返回结果
                        resultFuture.complete(Collections.singletonList(tradeSkuOrderBean));
                    }
                });

            }
        }, 100, TimeUnit.SECONDS);

//        mapStream.print();
        // 关联SPU
        SingleOutputStreamOperator<TradeSkuOrderBean> mapStream1 = AsyncDataStream.unorderedWait(mapStream, new RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>() {
            private StatefulRedisConnection<String, String> redisAsyncConnection;
            private AsyncConnection hBaseAsyncConnection;

            @Override
            public void open(Configuration parameters) throws Exception {
                hBaseAsyncConnection = HbaseUtil.getHBaseAsyncConnection();
                redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
            }

            @Override
            public void close() throws Exception {
                HbaseUtil.closeAsyncConnection(hBaseAsyncConnection);
                RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
            }

            @Override
            public void asyncInvoke(TradeSkuOrderBean tradeSkuOrderBean, ResultFuture<TradeSkuOrderBean> resultFuture) throws Exception {
                String table = "dim_spu_info";
                String skuId = tradeSkuOrderBean.getSpuId();
                String redisKey = RedisUtil.getRedisKey(table, skuId);
                //先查redis
                CompletableFuture.supplyAsync(new Supplier<String>() {
                    @Override
                    public String get() {
                        String dimInfo = null;
                        RedisFuture<String> stringRedisFuture = redisAsyncConnection.async().get(redisKey);
                        try {
                            dimInfo = stringRedisFuture.get();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                        return dimInfo;
                    }
                }).thenApplyAsync(new Function<String, JSONObject>() {
                    @Override
                    public JSONObject apply(String dimInfo) {
                        JSONObject jsonObject = null;
                        // redis没有该值
                        if (dimInfo == null || dimInfo.length() == 0) {
                            try {
                                jsonObject = HbaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, table, skuId);
                                // 放到redis,进行缓存
                                redisAsyncConnection.async().setex(redisKey, 24 * 3600, jsonObject.toJSONString());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }else{
                            jsonObject = JSON.parseObject(dimInfo);
                        }
                        return jsonObject;
                    }
                }).thenAccept(new Consumer<JSONObject>() {
                    @Override
                    public void accept(JSONObject dimSpuInfo) {
                        if (dimSpuInfo != null) {
                            // 关联SKU
                            tradeSkuOrderBean.setSpuName(dimSpuInfo.getString("spu_name"));
                        } else {
                            System.out.println("没有查到维度数据:" + table + ":" + skuId);
                        }
                        // 返回结果
                        resultFuture.complete(Collections.singletonList(tradeSkuOrderBean));
                    }
                });
            }
        }, 100, TimeUnit.SECONDS);


        // 8.写到Doris
        mapStream1.print();


    }
}
